/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.core.construction;



import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.CoderException;

import com.bff.gaia.unified.sdk.coders.ListCoder;

import com.bff.gaia.unified.sdk.coders.NullableCoder;

import com.bff.gaia.unified.sdk.coders.SerializableCoder;

import com.bff.gaia.unified.sdk.coders.StructuredCoder;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.BoundedSource.BoundedReader;

import com.bff.gaia.unified.sdk.io.Read;

import com.bff.gaia.unified.sdk.io.UnboundedSource;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.util.NameUtils;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.TimestampedValue;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import org.joda.time.Instant;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.util.*;

import java.util.stream.Collectors;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/**

 * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.

 *

 * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#split}, and

 * element timestamps are propagated. While any elements remain, the watermark is the beginning of

 * time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced the

 * watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.

 *

 * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner {@link

 * BoundedSource}. Sources that cannot be split are read entirely into memory, so this transform

 * does not work well with large, unsplittable sources.

 *

 * <p>This transform is intended to be used by a runner during pipeline translation to convert a

 * Read.Bounded into a Read.Unbounded.

 */

public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {



  private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);



  // Using 64MB in cases where we cannot compute a valid estimated size for a source.

  private static final long DEFAULT_ESTIMATED_SIZE = 64 * 1024 * 1024;



  private final BoundedSource<T> source;



  /**

   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.

   */

  public UnboundedReadFromBoundedSource(BoundedSource<T> source) {

    this.source = source;

  }



  @Override

  public PCollection<T> expand(PBegin input) {

    return input.getPipeline().apply(Read.from(new BoundedToUnboundedSourceAdapter<>(source)));

  }



  @Override

  protected Coder<T> getDefaultOutputCoder() {

    return source.getDefaultOutputCoder();

  }



  @Override

  public String getKindString() {

    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));

  }



  @Override

  public void populateDisplayData(DisplayData.Builder builder) {

    // We explicitly do not register base-class data, instead we use the delegate inner source.

    builder.add(DisplayData.item("source", source.getClass())).include("source", source);

  }



  /** A {@code BoundedSource} to {@code UnboundedSource} adapter. */

  @VisibleForTesting

  public static class BoundedToUnboundedSourceAdapter<T>

      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {



    private BoundedSource<T> boundedSource;



    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {

      this.boundedSource = boundedSource;

    }



    @Override

    public void validate() {

      boundedSource.validate();

    }



    @Override

    public List<BoundedToUnboundedSourceAdapter<T>> split(

        int desiredNumSplits, PipelineOptions options) throws Exception {

      try {

        long estimatedSize = boundedSource.getEstimatedSizeBytes(options);

        if (estimatedSize <= 0) {

          // Source is unable to provide a valid estimated size. So using default size.

          LOG.warn(

              "Cannot determine a valid estimated size for BoundedSource {}. Using default "

                  + "size of {} bytes",

              boundedSource,

              DEFAULT_ESTIMATED_SIZE);

          estimatedSize = DEFAULT_ESTIMATED_SIZE;

        }



        // Each split should at least be of size 1 byte.

        long desiredBundleSize = Math.max(estimatedSize / desiredNumSplits, 1);



        List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options);

        if (splits.size() == 0) {

          splits = ImmutableList.of(boundedSource);

        }

        return splits.stream()

            .map(input -> new BoundedToUnboundedSourceAdapter<>(input))

            .collect(Collectors.toList());

      } catch (Exception e) {

        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);

        return ImmutableList.of(this);

      }

    }



    @Override

    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)

        throws IOException {

      if (checkpoint == null) {

        return new Reader(null /* residualElements */, boundedSource, options);

      } else {

        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);

      }

    }



    @Override

    public Coder<T> getDefaultOutputCoder() {

      return boundedSource.getDefaultOutputCoder();

    }



    @SuppressWarnings({"rawtypes", "unchecked"})

    @Override

    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {

      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());

    }



    /**

     * A marker representing the progress and state of an {@link BoundedToUnboundedSourceAdapter}.

     */

    @VisibleForTesting

    public static class Checkpoint<T> implements UnboundedSource.CheckpointMark {

      private final @Nullable

	  List<TimestampedValue<T>> residualElements;

      private final @Nullable

	  BoundedSource<T> residualSource;



      public Checkpoint(

          @Nullable List<TimestampedValue<T>> residualElements,

          @Nullable BoundedSource<T> residualSource) {

        this.residualElements = residualElements;

        this.residualSource = residualSource;

      }



      @Override

      public void finalizeCheckpoint() {}



      @VisibleForTesting

      @Nullable

      List<TimestampedValue<T>> getResidualElements() {

        return residualElements;

      }



      @VisibleForTesting

      @Nullable

      BoundedSource<T> getResidualSource() {

        return residualSource;

      }

    }



    @VisibleForTesting

    static class CheckpointCoder<T> extends StructuredCoder<Checkpoint<T>> {



      // The coder for a list of residual elements and their timestamps

      private final Coder<List<TimestampedValue<T>>> elemsCoder;

      // The coder from the BoundedReader for coding each element

      private final Coder<T> elemCoder;

      // The nullable and serializable coder for the BoundedSource.

      @SuppressWarnings("rawtypes")

      private final Coder<BoundedSource> sourceCoder;



      CheckpointCoder(Coder<T> elemCoder) {

        this.elemsCoder =

            NullableCoder.of(ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));

        this.elemCoder = elemCoder;

        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));

      }



      @Override

      public void encode(Checkpoint<T> value, OutputStream outStream)

          throws CoderException, IOException {

        elemsCoder.encode(value.residualElements, outStream);

        sourceCoder.encode(value.residualSource, outStream);

      }



      @SuppressWarnings("unchecked")

      @Override

      public Checkpoint<T> decode(InputStream inStream) throws CoderException, IOException {

        return new Checkpoint<>(elemsCoder.decode(inStream), sourceCoder.decode(inStream));

      }



      @Override

      public List<Coder<?>> getCoderArguments() {

        return Arrays.asList(elemCoder);

      }



      @Override

      public void verifyDeterministic() throws NonDeterministicException {

        throw new NonDeterministicException(

            this, "CheckpointCoder uses Java Serialization, which may be non-deterministic.");

      }

    }



    /**

     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into {@link

     * ResidualElements} and {@link ResidualSource}.

     *

     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains

     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will

     * be split into {@link ResidualElements} and {@link ResidualSource}.

     */

    @VisibleForTesting

    class Reader extends UnboundedReader<T> {

      // Initialized in init()

      private @Nullable

	  ResidualElements residualElements;

      private @Nullable

	  ResidualSource residualSource;

      private final PipelineOptions options;

      private boolean done;



      Reader(

          @Nullable List<TimestampedValue<T>> residualElementsList,

          @Nullable BoundedSource<T> residualSource,

          PipelineOptions options) {

        init(residualElementsList, residualSource, options);

        this.options = checkNotNull(options, "options");

        this.done = false;

      }



      private void init(

          @Nullable List<TimestampedValue<T>> residualElementsList,

          @Nullable BoundedSource<T> residualSource,

          PipelineOptions options) {

        this.residualElements =

            residualElementsList == null

                ? new ResidualElements(Collections.emptyList())

                : new ResidualElements(residualElementsList);

        this.residualSource =

            residualSource == null ? null : new ResidualSource(residualSource, options);

      }



      @Override

      public boolean start() throws IOException {

        return advance();

      }



      @Override

      public boolean advance() throws IOException {

        if (residualElements.advance()) {

          return true;

        } else if (residualSource != null && residualSource.advance()) {

          return true;

        } else {

          done = true;

          return false;

        }

      }



      @Override

      public void close() throws IOException {

        if (residualSource != null) {

          residualSource.close();

        }

      }



      @Override

      public T getCurrent() throws NoSuchElementException {

        if (residualElements.hasCurrent()) {

          return residualElements.getCurrent();

        } else if (residualSource != null) {

          return residualSource.getCurrent();

        } else {

          throw new NoSuchElementException();

        }

      }



      @Override

      public Instant getCurrentTimestamp() throws NoSuchElementException {

        if (residualElements.hasCurrent()) {

          return residualElements.getCurrentTimestamp();

        } else if (residualSource != null) {

          return residualSource.getCurrentTimestamp();

        } else {

          throw new NoSuchElementException();

        }

      }



      @Override

      public Instant getWatermark() {

        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;

      }



      /**

       * {@inheritDoc}

       *

       * <p>If only part of the {@link ResidualElements} is consumed, the new checkpoint will

       * contain the remaining elements in {@link ResidualElements} and the {@link ResidualSource}.

       *

       * <p>If all {@link ResidualElements} and part of the {@link ResidualSource} are consumed, the

       * new checkpoint is done by splitting {@link ResidualSource} into new {@link

       * ResidualElements} and {@link ResidualSource}. {@link ResidualSource} is the source split

       * from the current source, and {@link ResidualElements} contains rest elements from the

       * current source after the splitting. For unsplittable source, it will put all remaining

       * elements into the {@link ResidualElements}.

       */

      @Override

      public Checkpoint<T> getCheckpointMark() {

        Checkpoint<T> newCheckpoint;

        if (!residualElements.done()) {

          // Part of residualElements are consumed.

          // Checkpoints the remaining elements and residualSource.

          newCheckpoint =

              new Checkpoint<>(

                  residualElements.getRestElements(),

                  residualSource == null ? null : residualSource.getSource());

        } else if (residualSource != null) {

          newCheckpoint = residualSource.getCheckpointMark();

        } else {

          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);

        }

        // Re-initialize since the residualElements and the residualSource might be

        // consumed or split by checkpointing.

        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);

        return newCheckpoint;

      }



      @Override

      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {

        return BoundedToUnboundedSourceAdapter.this;

      }

    }



    private class ResidualElements {

      private final List<TimestampedValue<T>> elementsList;

      private @Nullable

	  Iterator<TimestampedValue<T>> elementsIterator;

      private @Nullable

	  TimestampedValue<T> currentT;

      private boolean hasCurrent;

      private boolean done;



      ResidualElements(List<TimestampedValue<T>> residualElementsList) {

        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");

        this.elementsIterator = null;

        this.currentT = null;

        this.hasCurrent = false;

        this.done = false;

      }



      public boolean advance() {

        if (elementsIterator == null) {

          elementsIterator = elementsList.iterator();

        }

        if (elementsIterator.hasNext()) {

          currentT = elementsIterator.next();

          hasCurrent = true;

          return true;

        } else {

          done = true;

          hasCurrent = false;

          return false;

        }

      }



      boolean hasCurrent() {

        return hasCurrent;

      }



      boolean done() {

        return done;

      }



      TimestampedValue<T> getCurrentTimestampedValue() {

        if (!hasCurrent) {

          throw new NoSuchElementException();

        }

        return currentT;

      }



      T getCurrent() {

        return getCurrentTimestampedValue().getValue();

      }



      Instant getCurrentTimestamp() {

        return getCurrentTimestampedValue().getTimestamp();

      }



      List<TimestampedValue<T>> getRestElements() {

        if (elementsIterator == null) {

          return elementsList;

        } else {

          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();

          while (elementsIterator.hasNext()) {

            newResidualElements.add(elementsIterator.next());

          }

          return newResidualElements;

        }

      }

    }



    private class ResidualSource {

      private BoundedSource<T> residualSource;

      private PipelineOptions options;

      private @Nullable

	  BoundedReader<T> reader;

      private boolean closed;

      private boolean readerDone;



      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {

        this.residualSource = checkNotNull(residualSource, "residualSource");

        this.options = checkNotNull(options, "options");

        this.reader = null;

        this.closed = false;

        this.readerDone = false;

      }



      private boolean advance() throws IOException {

        checkArgument(!closed, "advance() call on closed %s", getClass().getName());

        if (readerDone) {

          return false;

        }

        if (reader == null) {

          reader = residualSource.createReader(options);

          readerDone = !reader.start();

        } else {

          readerDone = !reader.advance();

        }

        return !readerDone;

      }



      T getCurrent() throws NoSuchElementException {

        if (reader == null) {

          throw new NoSuchElementException();

        }

        return reader.getCurrent();

      }



      Instant getCurrentTimestamp() throws NoSuchElementException {

        if (reader == null) {

          throw new NoSuchElementException();

        }

        return reader.getCurrentTimestamp();

      }



      void close() throws IOException {

        if (reader != null) {

          reader.close();

          reader = null;

        }

        closed = true;

      }



      BoundedSource<T> getSource() {

        return residualSource;

      }



      Checkpoint<T> getCheckpointMark() {

        if (reader == null) {

          // Reader hasn't started, checkpoint the residualSource.

          return new Checkpoint<>(null /* residualElements */, residualSource);

        } else {

          // Part of residualSource are consumed.

          // Splits the residualSource and tracks the new residualElements in current source.

          BoundedSource<T> residualSplit = null;

          Double fractionConsumed = reader.getFractionConsumed();

          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {

            double fractionRest = 1 - fractionConsumed;

            int splitAttempts = 8;

            for (int i = 0; i < 8 && residualSplit == null; ++i) {

              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;

              residualSplit = reader.splitAtFraction(fractionToSplit);

            }

          }

          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();

          try {

            while (advance()) {

              newResidualElements.add(

                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));

            }

          } catch (IOException e) {

            throw new RuntimeException("Failed to read elements from the bounded reader.", e);

          }

          return new Checkpoint<>(newResidualElements, residualSplit);

        }

      }

    }

  }

}